package com.dzx.spark.web.utils;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**

 * hbase操作工具类

 */

public class HBaseUtils {



    HBaseAdmin admin = null;

    Configuration configuration = null;



    private HBaseUtils() {

        configuration = new Configuration();

        configuration.set("hbase.zookeeper.quorum", "hadoop000:2181");

        configuration.set("hbase.rootdir", "hdfs://hadoop000:8020/hbase");



        try {

            admin = new HBaseAdmin(configuration);

        } catch (IOException e) {

            e.printStackTrace();

        }

    }



    public static HBaseUtils instance = null;



    public static synchronized HBaseUtils getInstance() {

        if (instance == null) {

            instance = new HBaseUtils();

        }

        return instance;

    }





    /**

     * 根据表名获取table实例

     *

     * @param tableName 表名

     * @return

     */

    public HTable getTable(String tableName) {

        HTable hTable = null;

        try {

            hTable = new HTable(configuration, tableName);

        } catch (IOException e) {

            e.printStackTrace();

        }

        return hTable;

    }



    /**

     * 添加一条记录到Hbase表中

     *

     * @param tableName 表名

     * @param rowKey    表的rowkey

     * @param cf        columnfamily名

     * @param column    列名

     * @param value     写入Habse表的值

     */

    public void put(String tableName, String rowKey, String cf, String column, String value) {

        HTable table = getTable(tableName);

        Put put = new Put(Bytes.toBytes(rowKey));

        put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));

        try {

            table.put(put);

        } catch (IOException e) {

            e.printStackTrace();

        }



    }

    /**
     * 根据表名和输入条件获取hbase的记录数
     * @param tableName
     * @param dayCourse
     * @return
     */
    public Map<String,Long> query(String tableName,String dayCourse) throws IOException {
        Map<String,Long> map = new HashMap<String,Long>();

        HTable table = getTable(tableName);
        String cf = "info";
        String qualifier = "click_count";
        Scan scan = new Scan();
        Filter filter = new PrefixFilter(Bytes.toBytes(dayCourse));
         scan.setFilter(filter);
        ResultScanner scanner = table.getScanner(scan);
        for(Result result : scanner){
             String row =  Bytes.toString(result.getRow());
             long clickCount =  Bytes.toLong(result.getValue(cf.getBytes(),qualifier.getBytes()));
             map.put(row,clickCount);
        }
        return map;
    }





    public static void main(String[] args) throws IOException {

//        HTable hTable = HBaseUtils.getInstance().getTable("imooc_course_clickcount");

//        System.out.println(hTable.getName().getNameAsString());



        //测试,插入一条数据

        String tableName = "imooc_course_clickcount";

        String rowKey = "20171111_88";

        String cf = "info";

        String column = "click_count";

        String value = "2";

        HBaseUtils.getInstance().put(tableName, rowKey, cf, column, value);

        //scan  'imooc_course_clickcount' 查看表的数据
        Map<String, Long> imooc_course_clickcount = HBaseUtils.getInstance().query("imooc_course_clickcount", "20171117");
       for(Map.Entry<String,Long> entry:imooc_course_clickcount.entrySet()){
           System.out.println(entry.getKey()+":"+entry.getValue());
       }

        /** 获取到数据如下
         *   20171117_120 1536
         *   20171117_135 2304
         *   20171117_96  45623
         */
    }

}
